fix: Add generic mechanism to cancel queries on exception#64
fix: Add generic mechanism to cancel queries on exception#64
Conversation
|
Note Reviews pausedUse the following commands to manage reviews:
📝 WalkthroughWalkthroughAdds cursor-tracking wrappers to SQL execution: Sequence Diagram(s)sequenceDiagram
participant Client
participant Exec as _execute_sql_on_engine
participant TrackerConn as CursorTrackingSQLAlchemyConnection
participant DBAPIConn as DB-API Connection
participant Cursor
participant Database
participant QueryJob as Optional QueryJob
Client->>Exec: request execution(sql, engine)
Exec->>TrackerConn: obtain wrapped connection
TrackerConn->>DBAPIConn: install DB-API wrapper (tracks cursors)
Exec->>TrackerConn: execute(sql)
TrackerConn->>Cursor: cursor() created and registered
Cursor->>Database: run query (long-running)
alt External cancellation (KeyboardInterrupt)
Client->>Exec: KeyboardInterrupt
Exec->>TrackerConn: cancel_all_cursors()
TrackerConn->>Cursor: invoke cursor.cancel()
opt query job present
Exec->>QueryJob: query_job.cancel()
end
Cursor->>Database: request cancel
Database-->>Cursor: cancelled
Exec-->>Client: propagate interrupt/exception
else Normal completion
Database-->>Cursor: results
Cursor-->>Exec: return results
Exec-->>Client: return results
end
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Comment |
|
📦 Python package built successfully!
|
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #64 +/- ##
==========================================
+ Coverage 73.39% 73.50% +0.10%
==========================================
Files 93 93
Lines 5206 5253 +47
Branches 758 764 +6
==========================================
+ Hits 3821 3861 +40
- Misses 1143 1147 +4
- Partials 242 245 +3
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
|
🚀 Review App Deployment Started
|
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Fix all issues with AI agents
In `@deepnote_toolkit/sql/sql_execution.py`:
- Around line 525-540: The cursor tracking currently uses a WeakSet and will
raise TypeError for non‑weakrefable cursor objects; update __init__ to also
create a fallback strong set (e.g., self._self_strong_cursor_set = set()),
change cursor(self, *args, **kwargs) to attempt
self._self_cursor_registry.add(cursor) inside a try/except TypeError and on
exception add the cursor to self._self_strong_cursor_set, and modify
cancel_all_cursors() to iterate both self._self_cursor_registry and
self._self_strong_cursor_set to call _cancel_cursor(cursor) while catching
errors, then clear both collections after attempting cancellation.
- Around line 522-597: Add explicit typing to the new wrappers and helpers:
annotate CursorTrackingDBAPIConnection.__init__(self, wrapped: Any,
cursor_registry: Optional[weakref.WeakSet] = None) -> None,
CursorTrackingDBAPIConnection.cursor(self, *args: Any, **kwargs: Any) -> Any,
and CursorTrackingDBAPIConnection.cancel_all_cursors(self) -> None; annotate
CursorTrackingSQLAlchemyConnection.__init__(self, wrapped: Any) -> None,
_install_dbapi_wrapper(self) -> None, and cancel_all_cursors(self) -> None;
annotate _cancel_cursor(cursor: Any) -> None (and update _execute_sql_on_engine
signature similarly if it accepts nullable/Any types). Use Optional[T] for
parameters that can be None and import typing names (Any, Optional) as needed.
Ensure return types use -> None or -> Any where appropriate.
- Around line 555-566: The _install_dbapi_wrapper currently replaces
self.__wrapped__._dbapi_connection unconditionally, which can create chains when
it's already a CursorTrackingDBAPIConnection; update _install_dbapi_wrapper to
check the existing self.__wrapped__._dbapi_connection first, detect if it's an
instance of CursorTrackingDBAPIConnection (or exposes the registry/registry
holder), and if so reuse that wrapper by attaching/merging self._self_cursors
into the existing wrapper's registry instead of rewrapping; only create a new
CursorTrackingDBAPIConnection when the current _dbapi_connection is a raw DBAPI
connection (not already wrapped).
In `@tests/unit/test_sql_execution_internal.py`:
- Around line 12-33: Add explicit type hints to the
_setup_mock_engine_with_cursor helper: annotate the parameter mock_cursor as
unittest.mock.Mock (or typing.Any) and the return type as unittest.mock.Mock (or
sqlalchemy.engine.Connection) to match what the function returns; also type the
inner mock_exec_driver_sql callable (e.g., def mock_exec_driver_sql(sql: str,
*args: Any) -> unittest.mock.Mock) and annotate mock_sa_connection and
mock_dbapi_connection variables where declared to clarify their types. Ensure
you import typing.Any and unittest.mock.Mock (or appropriate sqlalchemy types)
at the top of the test file and update the function signature and nested
function signature accordingly.
…hen-cancelling-block-execution-3
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@deepnote_toolkit/sql/sql_execution.py`:
- Around line 576-582: The _cancel_cursor function currently only attempts
cursor.cancel() but the tests expect cursor.query_job.cancel() to be invoked for
BigQuery cursors; update _cancel_cursor to also check for a query_job attribute
and call cursor.query_job.cancel() when available (e.g., if hasattr(cursor,
"query_job") and hasattr(cursor.query_job, "cancel") then call it), while
preserving the existing try/except best-effort behavior and still attempting
cursor.cancel() when present; ensure both calls are wrapped in the same
exception handling so failures are ignored as before.
In `@tests/unit/test_sql_execution_internal.py`:
- Around line 82-99: The test fails because when a KeyboardInterrupt occurs
_execute_sql_on_engine (via _cancel_cursor) only calls cursor.cancel() but does
not cancel cursor.query_job; update the cancellation logic so that
_cancel_cursor (or the cancellation path inside _execute_sql_on_engine) checks
for a BigQuery query job on the cursor (e.g., cursor.query_job) and calls its
cancel() method before/after calling cursor.cancel(), ensuring both
mock_query_job.cancel() and cursor.cancel() are invoked.
|
@coderabbitai pause |
✅ Actions performedReviews paused. |
|
|
||
| def cancel_all_cursors(self): | ||
| """Cancel all tracked cursors. Best-effort, ignores errors.""" | ||
| for cursor in list(self._self_cursors): |
There was a problem hiding this comment.
nitpick: is the weakref set not iterable? does it need to be converted to a list?
There was a problem hiding this comment.
It is iterable, but it will throw if set is changed during iteration so it's safer to convert to list first, as weakset can change even without us doing anything, just because GC happened
There was a problem hiding this comment.
is that really the case? see:
https://stackoverflow.com/a/52693673
https://github.com/python/cpython/blob/3.10/Lib/_weakrefset.py#L11-L15
There was a problem hiding this comment.
I did verify it with code like this, but if there is specific guard against updates by GC in Python, sure, I can remove it
>>> import weakref
>>>
>>> class A: pass
...
>>> a = A()
>>> b = A()
>>>
>>> ws = weakref.WeakSet([a, b])
>>>
>>> for x in ws:
... ws.remove(a if x == b else b)
...
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/Users/olegwock/.pyenv/versions/3.12.7/lib/python3.12/_weakrefset.py", line 65, in __iter__
for itemref in self.data:
^^^^^^^^^
RuntimeError: Set changed size during iterationde3fa24 to
5595abc
Compare
m1so
left a comment
There was a problem hiding this comment.
lgtm with a minor concern about accessing private attributes and SQLAlchemy 1.x and 2.x compatibility
| """Replace SQLAlchemy's internal DBAPI connection with our tracking wrapper.""" | ||
| try: | ||
| # Access the internal DBAPI connection | ||
| dbapi_conn = self.__wrapped__._dbapi_connection |
There was a problem hiding this comment.
is there not a public attribute for accessing DBAPI connection that's compatible with 1.x and 2.x?
quick search yielded: https://docs.sqlalchemy.org/en/14/faq/connections.html#how-do-i-get-at-the-raw-dbapi-connection-when-using-an-engine and https://docs.sqlalchemy.org/en/20/faq/connections.html#how-do-i-get-at-the-raw-dbapi-connection-when-using-an-engine
we should also check whether it's possible to write a test case for each supported driver / sqlalchemy connect to verify this proxy class wouldn't emit too many warnings at runtime
There was a problem hiding this comment.
I guess there is, I'm so lost in all the layers of SQLAlchemy at this point. But I think it's fine to proxy _dbapi_connection (which is what they call connection fairy / PoolProxiedConnection ), that's what connection uses when there is a query to run, there is not need to go down hierarchy and proxy lower level entity.
There was a problem hiding this comment.
sure, I am asking about the _dbapi_connection access as it's a private attribute while the docs showcase public ones (notice no underscore):
engine = create_engine(...)
conn = engine.connect()
# pep-249 style PoolProxiedConnection (historically called a "connection fairy")
connection_fairy = conn.connection
# typically to run statements one would get a cursor() from this
# object
cursor_obj = connection_fairy.cursor()
# ... work with cursor_obj
# to bypass "connection_fairy", such as to set attributes on the
# unproxied pep-249 DBAPI connection, use .dbapi_connection
raw_dbapi_connection = connection_fairy.dbapi_connection
# the same thing is available as .driver_connection (more on this
# in the next section)
also_raw_dbapi_connection = connection_fairy.driver_connectionThere was a problem hiding this comment.
I updated the code to use only public properties
This add generic system to cancel query on exception. We now wrap SQLAlchemy connection to track produced cursors and if we receive
BaseException, we try to cancel them before reraising.This was tested with Trino. See instructions for testing Trino in #63
Unfortunately, for BigQuery I had to keep old runtime patch because at the moment of exception, there is no way to get reference to job (to cancel it) from cursor object, and cursor doesn't provide any method like
.cancel()Summary by CodeRabbit
New Features
Bug Fixes
Tests